<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<script id="configuration-template" type="text/x-handlebars-template">
  Kafka uses key-value pairs in the <a href="http://en.wikipedia.org/wiki/.properties">property file format</a> for configuration. These values can be supplied either from a file or programmatically.

  <h3><a id="brokerconfigs" href="#brokerconfigs">3.1 Broker Configs</a></h3>

  The essential configurations are the following:
  <ul>
      <li><code>broker.id</code>
      <li><code>log.dirs</code>
      <li><code>zookeeper.connect</code>
  </ul>

  Topic-level configurations and defaults are discussed in more detail <a href="#topicconfigs">below</a>.

  <!--#include virtual="generated/kafka_config.html" -->

  <p>More details about broker configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>

  <h4><a id="dynamicbrokerconfigs" href="#dynamicbrokerconfigs">3.1.1 Updating Broker Configs</a></h4>
  From Kafka version 1.1 onwards, some of the broker configs can be updated without restarting the broker. See the
  <code>Dynamic Update Mode</code> column in <a href="#brokerconfigs">Broker Configs</a> for the update mode of each broker config.
  <ul>
    <li><code>read-only</code>: Requires a broker restart for update</li>
    <li><code>per-broker</code>: May be updated dynamically for each broker</li>
    <li><code>cluster-wide</code>: May be updated dynamically as a cluster-wide default. May also be updated as a per-broker value for testing.</li>
  </ul>

  To alter the current broker configs for broker id 0 (for example, the number of log cleaner threads):
  <pre class="brush: bash;">
  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
  </pre>

  To describe the current dynamic broker configs for broker id 0:
  <pre class="brush: bash;">
  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
  </pre>

  To delete a config override and revert to the statically configured or default value for broker id 0 (for example,
  the number of log cleaner threads):
  <pre class="brush: bash;">
  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
  </pre>

  Some configs may be configured as a cluster-wide default to maintain consistent values across the whole cluster.  All brokers
  in the cluster will process the cluster default update. For example, to update log cleaner threads on all brokers:
  <pre class="brush: bash;">
  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
  </pre>

  To describe the currently configured dynamic cluster-wide default configs:
  <pre class="brush: bash;">
  &gt; bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
  </pre>

  All configs that are configurable at cluster level may also be configured at per-broker level (e.g. for testing).
  If a config value is defined at different levels, the following order of precedence is used:
  <ul>
  <li>Dynamic per-broker config stored in ZooKeeper</li>
  <li>Dynamic cluster-wide default config stored in ZooKeeper</li>
  <li>Static broker config from <code>server.properties</code></li>
  <li>Kafka default, see <a href="#brokerconfigs">broker configs</a></li>
  </ul>

  <h5>Updating Password Configs Dynamically</h5>
  <p>Password config values that are dynamically updated are encrypted before storing in ZooKeeper. The broker config
  <code>password.encoder.secret</code> must be configured in <code>server.properties</code> to enable dynamic update
  of password configs. The secret may be different on different brokers.</p>
  <p>The secret used for password encoding may be rotated with a rolling restart of brokers. The old secret used for encoding
  passwords currently in ZooKeeper must be provided in the static broker config <code>password.encoder.old.secret</code> and
  the new secret must be provided in <code>password.encoder.secret</code>. All dynamic password configs stored in ZooKeeper
  will be re-encoded with the new secret when the broker starts up.</p>
  <p>In Kafka 1.1.x, all dynamically updated password configs must be provided in every alter request when updating configs
  using <code>kafka-configs.sh</code> even if the password config is not being altered. This constraint will be removed in
  a future release.</p>

  <h5>Updating SSL Keystore of an Existing Listener</h5>
  Brokers may be configured with SSL keystores with short validity periods to reduce the risk of compromised certificates.
  Keystores may be updated dynamically without restarting the broker. The config name must be prefixed with the listener prefix
  <code>listener.name.{listenerName}.</code> so that only the keystore config of a specific listener is updated.
  The following configs may be updated in a single alter request at per-broker level:
  <ul>
    <li><code>ssl.keystore.type</code></li>
    <li><code>ssl.keystore.location</code></li>
    <li><code>ssl.keystore.password</code></li>
    <li><code>ssl.key.password</code></li>
  </ul>
  If the listener is the inter-broker listener, the update is allowed only if the new keystore is trusted by the truststore
  configured for that listener. For other listeners, no trust validation is performed on the keystore by the broker. Certificates
  must be signed by the same certificate authority that signed the old certificate to avoid any client authentication failures.

  <h5>Updating Default Topic Configuration</h5>
  Default topic configuration options used by brokers may be updated without broker restart. The configs are applied to topics
  without a topic config override for the equivalent per-topic config. One or more of these configs may be overridden at
  cluster-default level used by all brokers.
  <ul>
    <li><code>log.segment.bytes</code></li>
    <li><code>log.roll.ms</code></li>
    <li><code>log.roll.hours</code></li>
    <li><code>log.roll.jitter.ms</code></li>
    <li><code>log.roll.jitter.hours</code></li>
    <li><code>log.index.size.max.bytes</code></li>
    <li><code>log.flush.interval.messages</code></li>
    <li><code>log.flush.interval.ms</code></li>
    <li><code>log.retention.bytes</code></li>
    <li><code>log.retention.ms</code></li>
    <li><code>log.retention.minutes</code></li>
    <li><code>log.retention.hours</code></li>
    <li><code>log.index.interval.bytes</code></li>
    <li><code>log.cleaner.delete.retention.ms</code></li>
    <li><code>log.cleaner.min.compaction.lag.ms</code></li>
    <li><code>log.cleaner.min.cleanable.ratio</code></li>
    <li><code>log.cleanup.policy</code></li>
    <li><code>log.segment.delete.delay.ms</code></li>
    <li><code>unclean.leader.election.enable</code></li>
    <li><code>min.insync.replicas</code></li>
    <li><code>max.message.bytes</code></li>
    <li><code>compression.type</code></li>
    <li><code>log.preallocate</code></li>
    <li><code>log.message.timestamp.type</code></li>
    <li><code>log.message.timestamp.difference.max.ms</code></li>
  </ul>

  In Kafka version 1.1.x, changes to <code>unclean.leader.election.enable</code> take effect only when a new controller is elected.
  Controller re-election may be forced by running:

  <pre class="brush: bash;">
  &gt; bin/zookeeper-shell.sh localhost
  rmr /controller
  </pre>

  <h5>Updating Log Cleaner Configs</h5>
  Log cleaner configs may be updated dynamically at cluster-default level used by all brokers. The changes take effect
  on the next iteration of log cleaning. One or more of these configs may be updated:
  <ul>
    <li><code>log.cleaner.threads</code></li>
    <li><code>log.cleaner.io.max.bytes.per.second</code></li>
    <li><code>log.cleaner.dedupe.buffer.size</code></li>
    <li><code>log.cleaner.io.buffer.size</code></li>
    <li><code>log.cleaner.io.buffer.load.factor</code></li>
    <li><code>log.cleaner.backoff.ms</code></li>
  </ul>

  <h5>Updating Thread Configs</h5>
  The size of various thread pools used by the broker may be updated dynamically at cluster-default level used by all brokers.
  Updates are restricted to the range <code>currentSize / 2</code> to <code>currentSize * 2</code> to ensure that config updates are
  handled gracefully.
  <ul>
    <li><code>num.network.threads</code></li>
    <li><code>num.io.threads</code></li>
    <li><code>num.replica.fetchers</code></li>
    <li><code>num.recovery.threads.per.data.dir</code></li>
    <li><code>log.cleaner.threads</code></li>
    <li><code>background.threads</code></li>
  </ul>

  <h5>Adding and Removing Listeners</h5>
  <p>Listeners may be added or removed dynamically. When a new listener is added, security configs of the listener must be provided
  as listener configs with the listener prefix <code>listener.name.{listenerName}.</code>. If the new listener uses SASL,
  the JAAS configuration of the listener must be provided using the JAAS configuration property <code>sasl.jaas.config</code>
  with the listener and mechanism prefix. See <a href="#security_jaas_broker">JAAS configuration for Kafka brokers</a> for details.</p>

  <p>In Kafka version 1.1.x, the listener used by the inter-broker listener may not be updated dynamically. To update the inter-broker
  listener to a new listener, the new listener may be added on all brokers without restarting the broker. A rolling restart is then
  required to update <code>inter.broker.listener.name</code>.</p>

  In addition to all the security configs of new listeners, the following configs may be updated dynamically at per-broker level:
  <ul>
    <li><code>listeners</code></li>
    <li><code>advertised.listeners</code></li>
    <li><code>listener.security.protocol.map</code></li>
  </ul>
  Inter-broker listener must be configured using the static broker configuration <code>inter.broker.listener.name</code>
  or <code>inter.broker.security.protocol</code>.

  <h3><a id="topicconfigs" href="#topicconfigs">3.2 Topic-Level Configs</a></h3>

  Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more <code>--config</code> options. This example creates a topic named <i>my-topic</i> with a custom max message size and flush rate:
  <pre class="brush: bash;">
  &gt; bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
      --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
  </pre>
  Overrides can also be changed or set later using the alter configs command. This example updates the max message size for <i>my-topic</i>:
  <pre class="brush: bash;">
  &gt; bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
      --alter --add-config max.message.bytes=128000
  </pre>

  To check overrides set on the topic you can do
  <pre class="brush: bash;">
  &gt; bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
  </pre>

  To remove an override you can do
  <pre class="brush: bash;">
  &gt; bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
  </pre>

  The following are the topic-level configurations. The server's default configuration for this property is given under the Server Default Property heading. A given server default config value only applies to a topic if it does not have an explicit topic config override.

  <!--#include virtual="generated/topic_config.html" -->

  <h3><a id="producerconfigs" href="#producerconfigs">3.3 Producer Configs</a></h3>

  Below is the configuration of the Java producer:
  <!--#include virtual="generated/producer_config.html" -->

  <p>
      For those interested in the legacy Scala producer configs, information can be found <a href="http://kafka.apache.org/082/documentation.html#producerconfigs">
      here</a>.
  </p>

  <h3><a id="consumerconfigs" href="#consumerconfigs">3.4 Consumer Configs</a></h3>

  In 0.9.0.0 we introduced the new Java consumer as a replacement for the older Scala-based simple and high-level consumers.
  The configs for both new and old consumers are described below.

  <h4><a id="newconsumerconfigs" href="#newconsumerconfigs">3.4.1 New Consumer Configs</a></h4>
  Below is the configuration for the new consumer:
  <!--#include virtual="generated/consumer_config.html" -->

  <h4><a id="oldconsumerconfigs" href="#oldconsumerconfigs">3.4.2 Old Consumer Configs</a></h4>

  The essential old consumer configurations are the following:
  <ul>
          <li><code>group.id</code>
          <li><code>zookeeper.connect</code>
  </ul>

  <table class="data-table">
  <tbody><tr>
          <th>Property</th>
          <th>Default</th>
          <th>Description</th>
  </tr>
      <tr>
        <td>group.id</td>
        <td colspan="1"></td>
        <td>A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.</td>
      </tr>
      <tr>
        <td>zookeeper.connect</td>
        <td colspan="1"></td>
            <td>Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form <code>hostname1:port1,hostname2:port2,hostname3:port3</code>.
          <p>
      The server may also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string. For example to give a chroot path of <code>/chroot/path</code> you would give the connection string as  <code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>.</td>
      </tr>
      <tr>
        <td>consumer.id</td>
        <td colspan="1">null</td>
        <td>
          <p>Generated automatically if not set.</p>
      </td>
      </tr>
      <tr>
        <td>socket.timeout.ms</td>
        <td colspan="1">30 * 1000</td>
        <td>The socket timeout for network requests. The actual timeout set will be fetch.wait.max.ms + socket.timeout.ms.</td>
      </tr>
      <tr>
        <td>socket.receive.buffer.bytes</td>
        <td colspan="1">64 * 1024</td>
        <td>The socket receive buffer for network requests</td>
      </tr>
      <tr>
        <td>fetch.message.max.bytes</td>
        <td nowrap>1024 * 1024</td>
        <td>The number of bytes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch.</td>
      </tr>
      <tr>
        <td>num.consumer.fetchers</td>
        <td colspan="1">1</td>
        <td>The number fetcher threads used to fetch data.</td>
      </tr>
      <tr>
        <td>auto.commit.enable</td>
        <td colspan="1">true</td>
        <td>If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin.</td>
      </tr>
      <tr>
        <td>auto.commit.interval.ms</td>
        <td colspan="1">60 * 1000</td>
        <td>The frequency in ms that the consumer offsets are committed to zookeeper.</td>
      </tr>
      <tr>
        <td>queued.max.message.chunks</td>
        <td colspan="1">2</td>
        <td>Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes.</td>
      </tr>
      <tr>
        <td>rebalance.max.retries</td>
        <td colspan="1">4</td>
        <td>When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.</td>
      </tr>
      <tr>
        <td>fetch.min.bytes</td>
        <td colspan="1">1</td>
        <td>The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.</td>
      </tr>
      <tr>
        <td>fetch.wait.max.ms</td>
        <td colspan="1">100</td>
        <td>The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes</td>
      </tr>
      <tr>
        <td>rebalance.backoff.ms</td>
        <td>2000</td>
        <td>Backoff time between retries during rebalance. If not set explicitly, the value in zookeeper.sync.time.ms is used.
        </td>
      </tr>
      <tr>
        <td>refresh.leader.backoff.ms</td>
        <td colspan="1">200</td>
        <td>Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.</td>
      </tr>
      <tr>
        <td>auto.offset.reset</td>
        <td colspan="1">largest</td>
        <td>
          <p>What to do when there is no initial offset in ZooKeeper or if an offset is out of range:<br/>* smallest : automatically reset the offset to the smallest offset<br/>* largest : automatically reset the offset to the largest offset<br/>* anything else: throw exception to the consumer</p>
      </td>
      </tr>
      <tr>
        <td>consumer.timeout.ms</td>
        <td colspan="1">-1</td>
        <td>Throw a timeout exception to the consumer if no message is available for consumption after the specified interval</td>
      </tr>
      <tr>
        <td>exclude.internal.topics</td>
        <td colspan="1">true</td>
        <td>Whether messages from internal topics (such as offsets) should be exposed to the consumer.</td>
      </tr>
      <tr>
        <td>client.id</td>
        <td colspan="1">group id value</td>
        <td>The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.</td>
      </tr>
      <tr>
        <td>zookeeper.session.timeout.ms </td>
        <td colspan="1">6000</td>
        <td>ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.</td>
      </tr>
      <tr>
        <td>zookeeper.connection.timeout.ms</td>
        <td colspan="1">6000</td>
        <td>The max time that the client waits while establishing a connection to zookeeper.</td>
      </tr>
      <tr>
        <td>zookeeper.sync.time.ms </td>
        <td colspan="1">2000</td>
        <td>How far a ZK follower can be behind a ZK leader</td>
      </tr>
      <tr>
        <td>offsets.storage</td>
        <td colspan="1">zookeeper</td>
        <td>Select where offsets should be stored (zookeeper or kafka).</td>
      </tr>
      <tr>
        <td>offsets.channel.backoff.ms</td>
        <td colspan="1">1000</td>
        <td>The backoff period when reconnecting the offsets channel or retrying failed offset fetch/commit requests.</td>
      </tr>
      <tr>
        <td>offsets.channel.socket.timeout.ms</td>
        <td colspan="1">10000</td>
        <td>Socket timeout when reading responses for offset fetch/commit requests. This timeout is also used for ConsumerMetadata requests that are used to query for the offset manager.</td>
      </tr>
      <tr>
        <td>offsets.commit.max.retries</td>
        <td colspan="1">5</td>
        <td>Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during shut-down. It does not apply to commits originating from the auto-commit thread. It also does not apply to attempts to query for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason, it will be retried and that retry does not count toward this limit.</td>
      </tr>
      <tr>
        <td>dual.commit.enabled</td>
        <td colspan="1">true</td>
        <td>If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated to the new version that commits offsets to the broker (instead of directly to ZooKeeper).</td>
      </tr>
      <tr>
        <td>partition.assignment.strategy</td>
        <td colspan="1">range</td>
        <td><p>Select between the "range" or "roundrobin" strategy for assigning partitions to consumer streams.<p>The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.) Round-robin assignment is permitted only if: (a) Every topic has the same number of streams within a consumer instance (b) The set of subscribed topics is identical for every consumer instance within the group.<p> Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.</td>
      </tr>
  </tbody>
  </table>


  <p>More details about consumer configuration can be found in the scala class <code>kafka.consumer.ConsumerConfig</code>.</p>

  <h3><a id="connectconfigs" href="#connectconfigs">3.5 Kafka Connect Configs</a></h3>
  Below is the configuration of the Kafka Connect framework.
  <!--#include virtual="generated/connect_config.html" -->

  <h3><a id="streamsconfigs" href="#streamsconfigs">3.6 Kafka Streams Configs</a></h3>
  Below is the configuration of the Kafka Streams client library.
  <!--#include virtual="generated/streams_config.html" -->

  <h3><a id="adminclientconfigs" href="#adminclientconfigs">3.7 AdminClient Configs</a></h3>
  Below is the configuration of the Kafka Admin client library.
  <!--#include virtual="generated/admin_client_config.html" -->
</script>

<div class="p-configuration"></div>
